-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Open FlyteFile from remote path #2991
[BUG] Open FlyteFile from remote path #2991
Conversation
Signed-off-by: JiaWei Jiang <[email protected]>
Follow-ups
|
Thank you for your change, this indeed looks useful! I have one suggestion:
You should be able to add |
Hi @eapolinario, Amazing, thanks very much for the guide. I'll add the corresponding integration test asap! |
Signed-off-by: JiaWei Jiang <[email protected]>
Hi @eapolinario, This patch is now covered by the newly added integration test. If there's anything that can be improved, please let me know. Thanks a lot! |
flytekit/types/file/file.py
Outdated
ff._remote_source = uri | ||
|
||
return ff | ||
|
||
@staticmethod | ||
def downloader(remote_path: str, local_path: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the context that we pass to async_to_python_value?
def downloader(remote_path: str, local_path: str) -> None: | |
def downloader(ctx: FlyteContext, remote_path: str, local_path: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect!
We could just reuse the passed context instead of getting it again from FlyteContextManager
.
flytekit/types/file/file.py
Outdated
if ctx.file_access.is_remote(self.path) and self._remote_source is None: | ||
# Setup remote file source and local file destination | ||
self._remote_source = self.path | ||
local_path = ctx.file_access.get_random_local_path(self._remote_source) | ||
self._downloader = lambda: FlyteFilePathTransformer.downloader( | ||
remote_path=self._remote_source, local_path=local_path | ||
) | ||
self.path = local_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to move these code to __init__
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Kevin,
After some more tests, I observe that setting up these paths within __init__
leads to an error. This use case is described as follows:
- Initialize a
FlyteFile
with a remote path - Call
FlyteFilePathTransformer.async_to_literal
beforeFlyteFile
is opened
The error shows that the local file destination to which we download the remote file hasn't been created because the we do delayed downloading until users call open()
explicitly.
To be concrete, a non-existing source_path
will be used here. Hence, a FileNotFoundError
occurs.
If there's any suggestion, please let me know. Thanks a lot!
Signed-off-by: JiaWei Jiang <[email protected]>
1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]>
We clean the test logic in the last commit following the suggestion here. |
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run #82f00fActionable Suggestions - 5
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
# Upload a file to minio s3 bucket | ||
file_transfer = SimpleFileTransfer() | ||
remote_file_path = file_transfer.upload_file(file_type="json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test creates temporary files but relies on manual cleanup. Consider using a context manager or pytest.fixture
to ensure cleanup happens even if the test fails
Code suggestion
Check the AI-generated fix before applying
# Upload a file to minio s3 bucket | |
file_transfer = SimpleFileTransfer() | |
remote_file_path = file_transfer.upload_file(file_type="json") | |
@pytest.fixture | |
def remote_test_file(): | |
file_transfer = SimpleFileTransfer() | |
remote_file_path = file_transfer.upload_file(file_type="json") | |
yield remote_file_path | |
url = urlparse(remote_file_path) | |
bucket, key = url.netloc, url.path.lstrip("/") | |
file_transfer.delete_file(bucket=bucket, key=key) |
Code Review Run #82f00f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling for the case where workflow execution fails. Currently, only checking for SUCCEEDED
phase but not handling other phases like FAILED
, ABORTED
, etc.
Code suggestion
Check the AI-generated fix before applying
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | |
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED, f"Execution failed with phase: {execution.closure.phase}" | |
execution = remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | |
if execution.closure.phase == WorkflowExecutionPhase.FAILED: | |
raise RuntimeError(f"Workflow execution failed: {execution.closure.error}") | |
elif execution.closure.phase == WorkflowExecutionPhase.ABORTED: | |
raise RuntimeError("Workflow execution was aborted") | |
elif execution.closure.phase != WorkflowExecutionPhase.SUCCEEDED: | |
raise RuntimeError(f"Unexpected execution phase: {execution.closure.phase}") | |
assert execution.closure.phase == WorkflowExecutionPhase.SUCCEEDED |
Code Review Run #82f00f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
self._downloader() | ||
self._downloaded = True | ||
return self.path | ||
return self._local_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a check for self._local_path
before returning it to ensure it's not None
or empty.
Code suggestion
Check the AI-generated fix before applying
return self._local_path | |
if not self._local_path: | |
raise ValueError("Local path is not initialized") | |
if not os.path.exists(self._local_path): | |
raise FileNotFoundError(f"Local path {self._local_path} does not exist") |
Code Review Run #82f00f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
def downloader( | ||
ctx: FlyteContext, remote_path: typing.Union[str, os.PathLike], local_path: typing.Union[str, os.PathLike] | ||
) -> None: | ||
""" | ||
Download data from remote_path to local_path. | ||
|
||
We design the downloader as a static method because its behavior is logically | ||
related to this class but don't need to interact with class or instance data. | ||
""" | ||
ctx.file_access.get_data(remote_path, local_path, is_multipart=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling in the downloader
method to handle potential file access or I/O errors that may occur during file download operations.
Code suggestion
Check the AI-generated fix before applying
@@ -744,10 +744,15 @@
def downloader(
ctx: FlyteContext, remote_path: typing.Union[str, os.PathLike], local_path: typing.Union[str, os.PathLike]
) -> None:
- """
Download data from remote_path to local_path.
- """
- ctx.file_access.get_data(remote_path, local_path, is_multipart=False)
+ try:
+ ctx.file_access.get_data(remote_path, local_path, is_multipart=False)
+ except Exception as e:
+ logger.error(f"Failed to download file from {remote_path} to {local_path}: {str(e)}")
+ raise
Code Review Run #82f00f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
ff = FlyteFile(path=remote_file_path) | ||
with open(ff, "r") as f: | ||
content = f.read() | ||
print(f"FILE CONTENT | {content}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling when opening and reading the file. The current implementation may raise exceptions if the file cannot be opened or read.
Code suggestion
Check the AI-generated fix before applying
ff = FlyteFile(path=remote_file_path) | |
with open(ff, "r") as f: | |
content = f.read() | |
print(f"FILE CONTENT | {content}") | |
ff = FlyteFile(path=remote_file_path) | |
try: | |
with open(ff, "r") as f: | |
content = f.read() | |
print(f"FILE CONTENT | {content}") | |
except (IOError, OSError) as e: | |
print(f"Error reading file: {e}") | |
raise |
Code Review Run #82f00f
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@mock.patch.dict(os.environ, { | ||
"AWS_ENDPOINT_URL": "http://localhost:30002", | ||
"AWS_ACCESS_KEY_ID": "minio", | ||
"AWS_SECRET_ACCESS_KEY": "miniostorage" | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this after this PR.
#3001
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me fix it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @arbaobao
can you add these to tests?
It mocks to cases
- create and read file from s3 in same task pod
- create and read file in different task pod (create in 1st pod, then pass it to 2nd pod to read)
import os
from flytekit import task, workflow
from flytekit.types.file import FlyteFile
# Remote path
remote_path = "s3://my-s3-bucket/s3_flyte_dir/example.txt"
@task
def create_ff() -> FlyteFile:
return FlyteFile(path=remote_path)
@task
def read_ff(ff: FlyteFile):
with open(ff, "r") as f:
content = f.read()
print("=============================")
print(content)
print("=============================")
@task
def create_and_read_ff() -> FlyteFile:
ff = FlyteFile(path=remote_path)
with open(ff, "r") as f:
content = f.read()
print("=============================")
print(content)
print("=============================")
return ff
@workflow
def wf():
f1 = create_ff()
read_ff(f1)
f2 = create_and_read_ff()
read_ff(f2)
if __name__ == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
runner = CliRunner()
path = os.path.realpath(__file__)
result = runner.invoke(pyflyte.main, ["run", path, "wf"])
print(result.output)
Nice! I'll add these test cases. Thanks for your suggestion! |
Please refer to flyteorg#3001 Signed-off-by: JiaWei Jiang <[email protected]>
Create ff in one task pod and read it in another task pod. Signed-off-by: JiaWei Jiang <[email protected]>
Code Review Agent Run #75cc24Actionable Suggestions - 1
Review Details
|
with open(ff, "r") as f: | ||
content = f.read() | ||
print(f"FILE CONTENT | {content}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the file reading logic in read_ff()
and create_and_read_ff()
to avoid code duplication. The file reading code block appears in both functions.
Code suggestion
Check the AI-generated fix before applying
@@ -12,9 +12,13 @@
+def _read_file_content(ff: FlyteFile) -> None:
+ with open(ff, "r") as f:
+ content = f.read()
+ print(f"FILE CONTENT | {content}")
+
def read_ff(ff: FlyteFile) -> None:
"""Read input FlyteFile.
This can be used in the case in which a FlyteFile is created
in another task pod and read in this task pod.
"""
- with open(ff, "r") as f:
- content = f.read()
- print(f"FILE CONTENT | {content}")
+ _read_file_content(ff)
@@ -33,9 +33,7 @@
"""
ff = FlyteFile(path=file_path)
- with open(ff, "r") as f:
- content = f.read()
- print(f"FILE CONTENT | {content}")
+ _read_file_content(ff)
Code Review Run #75cc24
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
amazing PR, learn a lot from you, let's make flyte better and better together.
really excited to collaborate with you.
Thanks man! There's always something to learn from you. Let's go! |
* Store protos in local cache (#3022) * Store proto obj instead of model Literal in local cache Signed-off-by: Eduardo Apolinario <[email protected]> * Remove unused file Signed-off-by: Eduardo Apolinario <[email protected]> --------- Signed-off-by: Eduardo Apolinario <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> * Bump aiohttp from 3.9.5 to 3.10.11 (#3018) Bumps [aiohttp](https://github.com/aio-libs/aiohttp) from 3.9.5 to 3.10.11. - [Release notes](https://github.com/aio-libs/aiohttp/releases) - [Changelog](https://github.com/aio-libs/aiohttp/blob/master/CHANGES.rst) - [Commits](aio-libs/aiohttp@v3.9.5...v3.10.11) --- updated-dependencies: - dependency-name: aiohttp dependency-type: indirect ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix bug in FlyteDirectory.listdir on local files (#2926) * Fix issue in FlyteDirectory.listdir Fixes flyteorg/flyte#6005 Signed-off-by: Pim de Haan <[email protected]> * Added test Signed-off-by: Pim de Haan <[email protected]> * Run make lint Signed-off-by: Eduardo Apolinario <[email protected]> --------- Signed-off-by: Pim de Haan <[email protected]> Signed-off-by: Eduardo Apolinario <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> * Fix unit tests in airflow plugin (#3024) Signed-off-by: Kevin Su <[email protected]> * fix: Fix resource meta typos for async agent (#3023) Signed-off-by: JiaWei Jiang <[email protected]> * fix: format commands output (#3026) * Fix pydantic basemodel default input (#3013) * Fix pydantic default input Signed-off-by: Future-Outlier <[email protected]> * add pydantic integration test Signed-off-by: Future-Outlier <[email protected]> * Use duck typing by Thomas's advice Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Thomas J. Fan <[email protected]> * lint Signed-off-by: Future-Outlier <[email protected]> --------- Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Thomas J. Fan <[email protected]> * [BUG] Open FlyteFile from remote path (#2991) * fix: Open FlyteFile from remote path Signed-off-by: JiaWei Jiang <[email protected]> * Add integration test Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Use ctx as param instead of recreation Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Setup local path and downloader in constructor Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Move SimpleFileTransfer to an utility file Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant env var setup Please refer to #3001 Signed-off-by: JiaWei Jiang <[email protected]> * test: Add another ff use case Create ff in one task pod and read it in another task pod. Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: JiaWei Jiang <[email protected]> * vllm inference plugin (#2967) * vllm inference plugin Signed-off-by: Daniel Sola <[email protected]> * fixed default value Signed-off-by: Daniel Sola <[email protected]> --------- Signed-off-by: Daniel Sola <[email protected]> * Add poetry to image spec (#3025) * Add poetry to image spec Signed-off-by: Thomas J. Fan <[email protected]> * Add stricter check Signed-off-by: Thomas J. Fan <[email protected]> --------- Signed-off-by: Thomas J. Fan <[email protected]> * [test] Add integration test for accessing sd sttr in dc (#2969) * test: Add integration test for attr access of sd Signed-off-by: JiaWei Jiang <[email protected]> * Correct file path Signed-off-by: JiaWei Jiang <[email protected]> * test: Support interaction with minio s3 bucket 1. Upload a local parquet file to minio s3 bucket 2. Access StructuredDataset attr from a dataclass 3. Open StructuredDataset from a remote path Signed-off-by: JiaWei Jiang <[email protected]> * Delete an unmerged integration test Signed-off-by: JiaWei Jiang <[email protected]> * Try imagespec with commit sha of corresponding fix Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant test Signed-off-by: JiaWei Jiang <[email protected]> * Remove default_factory and create sd dc from input uri Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant minio env var setup and add test comments Signed-off-by: JiaWei Jiang <[email protected]> * Support uploading tmp pqt file Signed-off-by: JiaWei Jiang <[email protected]> * Udpate deprecated module Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant and unused imports Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: Eduardo Apolinario <[email protected]> Signed-off-by: dependabot[bot] <[email protected]> Signed-off-by: Pim de Haan <[email protected]> Signed-off-by: Kevin Su <[email protected]> Signed-off-by: JiaWei Jiang <[email protected]> Signed-off-by: Future-Outlier <[email protected]> Signed-off-by: Daniel Sola <[email protected]> Signed-off-by: Thomas J. Fan <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Pim de Haan <[email protected]> Co-authored-by: Kevin Su <[email protected]> Co-authored-by: 江家瑋 <[email protected]> Co-authored-by: V <[email protected]> Co-authored-by: Han-Ru Chen (Future-Outlier) <[email protected]> Co-authored-by: Thomas J. Fan <[email protected]> Co-authored-by: Daniel Sola <[email protected]>
* fix: Open FlyteFile from remote path Signed-off-by: JiaWei Jiang <[email protected]> * Add integration test Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Use ctx as param instead of recreation Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Setup local path and downloader in constructor Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Move SimpleFileTransfer to an utility file Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant env var setup Please refer to flyteorg#3001 Signed-off-by: JiaWei Jiang <[email protected]> * test: Add another ff use case Create ff in one task pod and read it in another task pod. Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: JiaWei Jiang <[email protected]> Signed-off-by: Shuying Liang <[email protected]>
* fix: Open FlyteFile from remote path Signed-off-by: JiaWei Jiang <[email protected]> * Add integration test Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Use ctx as param instead of recreation Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Setup local path and downloader in constructor Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Move SimpleFileTransfer to an utility file Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant env var setup Please refer to flyteorg#3001 Signed-off-by: JiaWei Jiang <[email protected]> * test: Add another ff use case Create ff in one task pod and read it in another task pod. Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: JiaWei Jiang <[email protected]> Signed-off-by: Shuying Liang <[email protected]>
* fix: Open FlyteFile from remote path Signed-off-by: JiaWei Jiang <[email protected]> * Add integration test Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Use ctx as param instead of recreation Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Clean test logic 1. Remove redundant prints 2. Use `mock.patch.dict` to setup `os.environ` for the current test fn * Avoid contaminating other tests running in the same process Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Setup local path and downloader in constructor Signed-off-by: JiaWei Jiang <[email protected]> * refactor: Move SimpleFileTransfer to an utility file Signed-off-by: JiaWei Jiang <[email protected]> * Remove redundant env var setup Please refer to flyteorg#3001 Signed-off-by: JiaWei Jiang <[email protected]> * test: Add another ff use case Create ff in one task pod and read it in another task pod. Signed-off-by: JiaWei Jiang <[email protected]> --------- Signed-off-by: JiaWei Jiang <[email protected]> Signed-off-by: Shuying Liang <[email protected]>
Tracking issue
Closes flyteorg/flyte#6090.
Why are the changes needed?
Directly opening a local or remote file with
FlyteFile
interface is quite useful. Also, this is commonly used with a context manager like:Following discuss both the local and remote cases:
FlyteFile
from a local pathIn this case, we don't need to download the file because it's already on the local file system. Hence, we just run a dummy downloading.
FlyteFile
from a remote path (our focus in this PR)As for the remote file, we definitely need to download the file from remote to our local file system before the file is accessed.
With this fix, users can directly open either a local or remote file in a consistent manner.
What changes were proposed in this pull request?
downloader
a static method ofFlyteFilePathTransformer
.FlyteFilePathTransformer
but doesn't need to interact with class or instance data.__fspath__
magic method ofFlyteFile
.How was this patch tested?
This patch is first tested with the following code snippet:
The result is shown as follows:
Setup process
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR enhances the FlyteFile interface by implementing a delayed downloading mechanism for remote files and improving remote file operations handling. The changes include a new static downloader method, improved path handling functionality, and better separation between file creation and reading operations. The implementation removes AWS endpoint configuration mocks and provides a consistent interface for both local and remote file scenarios.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2